package com.handy.kafkaconsumer.daemon;

import com.handy.commonredis.utils.RedisUtil;
import com.handy.kafkaconsumer.util.IdGenerator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author hs
 * @Description: {}
 * @date 2019/12/20 16:04
 */
@Component
public class TestTopicConsumer {
    @Resource(name = "RedisUtil")
    private RedisUtil redisUtil;
    @Autowired
    private IdGenerator idGenerator;

    int a = 0;

    /**
     * 接收消息
     *
     * @param record
     */
    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void listen(ConsumerRecord<?, ?> record) {
        redisUtil.set("consumer" + idGenerator.GenId(), record.value());
        System.out.println("本次搜索日志kafka接收到消息" + record.value() + "第" + (a++) + "条,分区为:" + record.partition() + "key:" + record.key());
    }

}
